---
title: "Orchestrator"
description: "Complex multi-step workflows with dependency management and state coordination."
---


<img src="https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F8985fc683fae4780fb34eab1365ab78c7e51bc8e-2401x1000.png&w=3840&q=75" alt="Orchestrator Workflow Pattern" />

{/* TODO: Add screenshot showing orchestrator execution from examples/workflows/workflow_orchestrator_worker/README.md */}
<img width="1650" alt="Orchestrator execution example" src="https://github.com/user-attachments/assets/12263f81-f2f8-41e2-a758-13d764f782a1" />

## Overview

The Orchestrator pattern handles complex, multi-step tasks through dynamic planning, parallel execution, and intelligent result synthesis. It breaks down objectives into manageable steps and coordinates specialized agents.

## Complete Implementation

The Orchestrator workflow handles complex multi-step tasks through dynamic planning and coordination. Here's a comprehensive implementation:

### Basic Orchestrator Setup

```python
import asyncio
import os
from mcp_agent.app import MCPApp
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.workflows.orchestrator.orchestrator import Orchestrator
from mcp_agent.workflows.llm.augmented_llm import RequestParams

app = MCPApp(name="assignment_grader_orchestrator")

async def run_orchestrator_example():
    async with app.run() as context:
        # Add current directory to filesystem server
        context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

        # Create specialized worker agents
        finder_agent = Agent(
            name="finder",
            instruction="""You are an agent with access to the filesystem and web fetching capabilities. 
            Your job is to identify the closest match to a user's request, make the appropriate tool calls, 
            and return the URI and CONTENTS of the closest match.""",
            server_names=["fetch", "filesystem"]
        )

        writer_agent = Agent(
            name="writer",
            instruction="""You are an agent that can write to the filesystem.
            You are tasked with taking the user's input, addressing it, and 
            writing the result to disk in the appropriate location.""",
            server_names=["filesystem"]
        )

        proofreader = Agent(
            name="proofreader",
            instruction="""Review the short story for grammar, spelling, and punctuation errors.
            Identify any awkward phrasing or structural issues that could improve clarity. 
            Provide detailed feedback on corrections.""",
            server_names=["fetch"]
        )

        fact_checker = Agent(
            name="fact_checker",
            instruction="""Verify the factual consistency within the story. Identify any contradictions,
            logical inconsistencies, or inaccuracies in the plot, character actions, or setting. 
            Highlight potential issues with reasoning or coherence.""",
            server_names=["fetch"]
        )

        style_enforcer = Agent(
            name="style_enforcer",
            instruction="""Analyze the story for adherence to style guidelines.
            Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to 
            enhance storytelling, readability, and engagement.""",
            server_names=["fetch"]
        )

        # Define the complex multi-step task
        task = """Load the student's short story from short_story.md, 
        and generate a report with feedback across proofreading, 
        factuality/logical consistency and style adherence. Use the style rules from 
        https://owl.purdue.edu/owl/research_and_citation/apa_style/apa_formatting_and_style_guide/general_format.html.
        Write the graded report to graded_report.md in the same directory as short_story.md"""

        # Create orchestrator with different planning strategies
        orchestrator = Orchestrator(
            llm_factory=OpenAIAugmentedLLM,
            available_agents=[
                finder_agent,
                writer_agent,
                proofreader,
                fact_checker,
                style_enforcer,
            ],
            plan_type="full",  # Generate complete plan upfront
            name="assignment_grader",
        )

        # Execute with custom parameters
        result = await orchestrator.generate_str(
            message=task,
            request_params=RequestParams(model="gpt-4o")
        )
        
        print("Orchestrator Result:")
        print(result)

        # Display token usage analysis
        node = await orchestrator.get_token_node()
        if node:
            display_token_usage(node, context)

        return result

def display_token_usage(node, context, indent="", is_last=True):
    """Display hierarchical token usage from orchestrator execution"""
    connector = "└── " if is_last else "├── "
    usage = node.get_usage()
    cost = node.get_cost() if hasattr(node, "get_cost") else 0.0
    
    if usage.total_tokens > 0:
        cost_str = f" (${cost:.4f})" if cost > 0 else ""
        print(f"{indent}{connector}{node.name} [{node.node_type}]")
        print(f"{indent}{'    ' if is_last else '│   '}├─ Total: {usage.total_tokens:,} tokens{cost_str}")
        print(f"{indent}{'    ' if is_last else '│   '}├─ Input: {usage.input_tokens:,}")
        print(f"{indent}{'    ' if is_last else '│   '}└─ Output: {usage.output_tokens:,}")

        if node.children:
            child_indent = indent + ("    " if is_last else "│   ")
            for i, child in enumerate(node.children):
                display_token_usage(child, context, child_indent, i == len(node.children) - 1)

if __name__ == "__main__":
    asyncio.run(run_orchestrator_example())
```

### Advanced Planning Strategies

The orchestrator supports different planning approaches:

#### Full Planning
Generate complete execution plan upfront:

```python
# Create orchestrator with full planning
orchestrator = Orchestrator(
    llm_factory=OpenAIAugmentedLLM,
    available_agents=agents,
    plan_type="full",  # Generate complete plan upfront
    max_planning_steps=10,
    name="full_planner",
)

# The orchestrator will create a plan like this:
# Step 1: Load the short story from short_story.md (finder_agent)
# Step 2: Generate feedback in parallel:
#   - Review for grammar/spelling (proofreader)
#   - Check factual consistency (fact_checker) 
#   - Evaluate style adherence (style_enforcer)
# Step 3: Compile feedback into report (writer_agent)
# Step 4: Write graded report to disk (writer_agent)
```

#### Iterative Planning
Plan one step at a time, adapting based on results:

```python
# Create orchestrator with iterative planning
orchestrator = Orchestrator(
    llm_factory=OpenAIAugmentedLLM,
    available_agents=agents,
    plan_type="iterative",  # Plan step by step
    max_iterations=20,
    name="iterative_planner",
)

# The orchestrator will:
# 1. Plan first step based on objective
# 2. Execute the step
# 3. Analyze results and plan next step
# 4. Repeat until objective is complete
```

### Configuration and Monitoring

```python
from mcp_agent.tracing.token_counter import TokenNode

# Advanced orchestrator configuration
orchestrator = Orchestrator(
    llm_factory=OpenAIAugmentedLLM,
    available_agents=agents,
    plan_type="full",
    
    # Execution parameters
    max_iterations=25,
    max_retries_per_task=3,
    parallel_execution=True,
    max_parallel_tasks=3,
    
    # Planning parameters  
    max_planning_steps=15,
    planning_temperature=0.7,
    planning_model="gpt-4o",
    
    # Monitoring
    enable_detailed_logging=True,
    name="production_orchestrator",
)

# Execute with monitoring
async def monitored_execution():
    result = await orchestrator.generate_str(
        message=complex_task,
        request_params=RequestParams(
            model="gpt-4o",
            temperature=0.3,
            max_tokens=4000
        )
    )
    
    # Get execution summary
    summary = await orchestrator.get_execution_summary()
    print(f"Total steps executed: {summary.steps_completed}")
    print(f"Parallel tasks run: {summary.parallel_tasks}")
    print(f"Total cost: ${summary.total_cost:.4f}")
    print(f"Execution time: {summary.execution_time:.2f}s")
    
    return result
```

## Key Features

- **Dynamic Planning**: Breaks down complex objectives into manageable steps
- **Parallel Execution**: Tasks within each step run simultaneously
- **Iterative vs Full Planning**: Choose between adaptive or upfront planning
- **Context Preservation**: Previous results inform subsequent steps
- **Intelligent Synthesis**: Combines results from multiple specialized agents

## Planning Modes

### Full Planning
```python
orchestrator = Orchestrator(
    worker_agents=agents,
    plan_type="full"  # Generate complete plan upfront
)
```

### Iterative Planning
```python
orchestrator = Orchestrator(
    worker_agents=agents, 
    plan_type="iterative"  # Plan one step at a time
)
```

## Use Cases

### Complex Development Projects
Handle multi-step software development tasks with dependencies:
- **Code Refactoring**: Analyze codebase, identify issues, implement fixes across multiple files
- **Feature Implementation**: Requirements analysis, design, implementation, testing, documentation
- **Bug Resolution**: Reproduce issue, analyze root cause, implement fix, verify solution
- **CI/CD Pipeline Setup**: Configure build scripts, set up testing, deploy infrastructure

### Research and Analysis
Coordinate comprehensive research workflows:
- **Literature Reviews**: Search databases, analyze papers, synthesize findings, write summaries
- **Market Research**: Gather competitor data, analyze trends, survey customers, compile reports
- **Financial Analysis**: Collect financial data, perform calculations, generate insights, create presentations
- **Due Diligence**: Legal review, technical assessment, financial audit, risk analysis

### Content Production
Multi-stage content creation with quality assurance:
- **Technical Documentation**: Research topic, write content, review accuracy, format for publication
- **Marketing Campaigns**: Market analysis, content creation, design assets, campaign testing
- **Academic Papers**: Literature review, data analysis, writing, peer review, revision
- **Product Launches**: Requirements gathering, specification writing, testing, documentation

### Operations and Automation
Complex operational workflows requiring coordination:
- **Incident Response**: Alert analysis, impact assessment, resolution planning, implementation
- **Compliance Audits**: Data collection, policy review, gap analysis, remediation planning  
- **System Migrations**: Current state analysis, migration planning, execution, validation
- **Performance Optimization**: Monitoring analysis, bottleneck identification, optimization implementation

## Setup and Installation

Clone the repository and navigate to the orchestrator workflow example:

```bash
git clone https://github.com/lastmile-ai/mcp-agent.git
cd mcp-agent/examples/workflows/workflow_orchestrator_worker
```

Install dependencies:

```bash
pip install uv
uv sync
uv pip install -r requirements.txt
```

Configure your environment:

```bash
cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml
```

Add your API keys to `mcp_agent.secrets.yaml`:

```yaml
openai_api_key: "your-openai-api-key"
anthropic_api_key: "your-anthropic-api-key"  # optional
```

Create a sample story file for the grading example:

```bash
echo "The Battle of Glimmerwood

In the heart of Glimmerwood, a mystical forest knowed for its radiant trees, a small village thrived. 
The villagers, who were live peacefully, shared their home with the forest's magical creatures, 
especially the Glimmerfoxes whose fur shimmer like moonlight." > short_story.md
```

Enable optional tracing in `mcp_agent.config.yaml`:

```yaml
otel:
  enabled: true  # Enable OpenTelemetry tracing
```

Run the example:

```bash
uv run main.py
```

The orchestrator will:
1. Load the story from `short_story.md`
2. Run parallel analysis (grammar, style, factual consistency)
3. Compile feedback into a comprehensive report
4. Write the graded report to `graded_report.md`

## Expected Output

The orchestrator generates a detailed execution plan and produces output similar to:

```plaintext
=== ORCHESTRATOR EXECUTION ===
Planning steps:
  1. Load short story content from file
  2. Parallel analysis (grammar, style, facts)
  3. Compile comprehensive feedback report
  4. Write graded report to disk

Execution Summary:
  Total steps executed: 4
  Parallel tasks run: 3
  Total cost: $0.0234
  Execution time: 45.67s
```

The final graded report includes:
- Grammar and spelling corrections
- Style adherence feedback based on APA guidelines
- Factual consistency analysis
- Overall assessment and recommendations

<Card title="Full Implementation" href="https://github.com/lastmile-ai/mcp-agent/tree/main/examples/workflows/workflow_orchestrator_worker">
  See the complete orchestrator example with student assignment grading workflow.
</Card>